Spark算子学习之四、其他算子 您所在的位置:网站首页 rdd sortby排序 Spark算子学习之四、其他算子

Spark算子学习之四、其他算子

#Spark算子学习之四、其他算子| 来源: 网络整理| 查看: 265

1.sortBy 1)函数签名 def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 2)功能描述

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。注意:排序后新产生的RDD的分区数与原RDD的分区数一致。

3)简单案例 案例一、 sc.makeRDD(List((2, "a"), (2, "a"), (2, "a"), (2, "a"), (3, "b"), (5, "c")), 3) .sortBy(_._1, true).collect().foreach(println)

结果 (2,a) (2,a) (2,a) (2,a) (3,b) (5,c)

案例二、 sc.makeRDD(List((2, "a"), (2, "a"), (2, "a"), (2, "a"), (3, "b"), (5, "c")), 3) .sortBy(_._1, true).mapPartitionsWithIndex( (index, datas) => { datas.map((index, _)) } ).foreach(println)

(1,(3,b)) (0,(2,a)) (2,(5,c)) (0,(2,a)) (0,(2,a)) (0,(2,a)) 从另个案例结果可以看到,其实sortby在处理数据时使用了range分区,不同分区取得值是某个范围内的数据,分区之间数据不重叠,但是从案例二结果中可以看到,0号分区中的数据很多,如果数据量大则可能出现数据倾斜。

2.filter 1)函数签名 def filter(f: T => Boolean): RDD[T] 2)功能描述

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中,不会改变分区。

3)简单案例 val result = sc.makeRDD(List("abc", "bac", "ad", "cb"), 3) result.filter(_.startsWith("a")) .mapPartitionsWithIndex((index, datas) => { datas.map((index, _)) }) .foreach(println)

结果 (2,ad) (0,abc) filter结果很简单,由于filter算子不改变分区数量,所以在生产环境可能会出现数据倾斜,因为不同分区过滤掉的数据量不同。

3.glom 1)函数签名 def glom(): RDD[Array[T]] 2)功能描述

将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致。

3)简单案例 sc.makeRDD(List(4, 2, 1, 3), 2) .mapPartitionsWithIndex((index, datas) => { datas.map((s"分区$index:" + _)) }).foreach(println) println("xxxxxxxxxxx") //重点关注返回结果数据类型 val result:RDD[Array[Int]] = sc.makeRDD(List(4, 2, 1, 3), 2) .glom() result.mapPartitionsWithIndex((index, datas) => { datas.map { list => { for (l datas.map((index, _)) }).foreach(println) println("xxxxxxxxxxxxxxxx") result.flatMap(_.iterator) .distinct() .mapPartitionsWithIndex((index, datas) => { datas.map((index, _)) }).foreach(println)

结果 (2,ad) (1,bac) (0,abc) (2,cb) xxxxxxxxxxxxxxxx (2,b) (1,d) (0,c) (1,a)

5.coalesce 1)函数签名 def coalesce(numPartitions: Int, shuffle: Boolean = false, //默认false不执行shuffle partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] 2)功能描述

缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。coalesce方法包含两种情况,一种使用shuffle一种不使用shuffle。

3)简单案例 案例一、不使用shuffle扩大分区 val result = sc.makeRDD(List("a", "b", "c", "d"),1) result .coalesce(2,false) .mapPartitionsWithIndex((index,datas) => { datas.map((index,_)) }).foreach(println)

结果 (0,a) (0,b) (0,c) (0,d) 注意这里虽然将分区数从2个变成3个,但是最终结果还是两个分区。

案例二、使用shuffle扩大分区 val result = sc.makeRDD(List("a", "b", "c", "d"),1) result .coalesce(2,true) .mapPartitionsWithIndex((index,datas) => { datas.map((index,_)) }).foreach(println)

结果 (0,a) (1,b) (0,c) (1,d)

案例三、缩小分区 val result = sc.makeRDD(List("a", "b", "c", "d"),2) result .coalesce(1) .mapPartitionsWithIndex((index,datas) => { datas.map((index,_)) }).foreach(println)

结果 (0,a) (0,b) (0,c) (0,d) 结合以上几个案例,其实coalesce一般用来缩小分区。

6.repartition 1)函数签名 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 2)功能描述

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

3)简单案例 sc.makeRDD(List(2,3,5),1) .repartition(2) .mapPartitionsWithIndex((index,datas) =>{ datas.map((index,_)) }).collect().foreach(println)

结果 (0,2) (0,5) (1,3)

4)coalesce和repartition区别

1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 2)repartition实际上是调用的coalesce,进行shuffle。源码如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }

3)coalesce一般为缩减分区,如果扩大分区,也不会增加分区总数,意义不大。repartition扩大分区执行shuffle,可以达到扩大分区的效果。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有